package com.carrot.mq.mqclient;

import com.carrot.mq.common.*;
import com.carrot.mq.mqserver.core.BasicProperties;
import com.carrot.mq.mqserver.core.ExchangeType;

import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author 兴趣使然黄小黄
 * @version 1.0
 * @date 2023/8/29 19:43
 * 表示一个逻辑上的连接
 * 提供一系列方法, 与服务器上的 API 对应
 * 本质就是发送特定的请求
 */
public class Channel {

    // channel 的身份标识
    private String channelId;

    // 当前的 channel 属于哪个连接
    private Connection connection;

    // 存储后续客户端收到服务器的响应
    private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();

    // 如果当前 Channel 订阅了某个队列, 就需要在此处记录对应的回调是啥, 当该队列的消息返回回来的时候, 调用回调
    // 约定一个 Channel 只能有一个回调
    private Consumer consumer = null;

    public Channel(String channelId, Connection connection) {
        this.channelId = channelId;
        this.connection = connection;
    }

    /**
     * 告知服务器, 客户端创建了新的 channel
     * @return 创建成功 true
     */
    public boolean createChannel() throws IOException {
        // 构造请求
        BasicArguments basicArguments = new BasicArguments();
        basicArguments.setChannelId(channelId);
        basicArguments.setRid(generateRid());
        byte[] payload = BinaryTool.toBytes(basicArguments);
        Request request = new Request();
        request.setType(0x1);
        request.setLength(payload.length);
        request.setPayload(payload);
        // 发送请求
        connection.writeRequest(request);
        // 等待服务器的响应
        BasicReturns basicReturns = waitResult(basicArguments.getRid());
        return basicReturns.isOk();
    }

    /**
     * 阻塞等待服务器的响应
     * @param rid 请求与响应的 rid 需要对应
     * @return basicReturns
     */
    private BasicReturns waitResult(String rid) {
        BasicReturns basicReturns = null;
        while ((basicReturns = basicReturnsMap.get(rid)) == null) {
            // 如果查询结果为 null 说明还未得到响应, 应该阻塞等待
            synchronized (this) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        // 读取成功后, 需要将消息从 basicReturnsMap 中删除
        basicReturnsMap.remove(rid);
        return basicReturns;
    }

    /**
     * 将响应放入 basicReturnsMap
     * @param basicReturns 响应的返回值
     */
    public void putReturns(BasicReturns basicReturns) {
        basicReturnsMap.put(basicReturns.getRid(), basicReturns);
        synchronized (this) {
            notifyAll();
        }
    }

    /**
     * 生成 Rid
     * @return 返回 Rid
     */
    private String generateRid() {
        return "R-" + UUID.randomUUID().toString();
    }

    /**
     * 关闭 channel, 给服务器发送一个 type = 0x2 的请求
     * @return 关闭成功 true
     */
    public boolean close() throws IOException {
        // 构造请求
        BasicArguments basicArguments = new BasicArguments();
        basicArguments.setChannelId(channelId);
        basicArguments.setRid(generateRid());
        byte[] payload = BinaryTool.toBytes(basicArguments);
        Request request = new Request();
        request.setType(0x2);
        request.setLength(payload.length);
        request.setPayload(payload);
        // 发送请求
        connection.writeRequest(request);
        // 等待服务器的响应
        BasicReturns basicReturns = waitResult(basicArguments.getRid());
        return basicReturns.isOk();
    }

    /**
     * 创建交换机, 给服务器发送一个 type = 0x3 的请求
     * @param exchangeName 交换机名
     * @param exchangeType 交换机类型
     * @param durable 是否持久化
     * @param autoDelete 是否自动删除
     * @param arguments 自定义配置
     * @return 创建成功 true
     */
    public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,
                                   Map<String, Object> arguments) throws IOException {
        // 构造 payload
        ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();
        exchangeDeclareArguments.setRid(generateRid());
        exchangeDeclareArguments.setChannelId(channelId);
        exchangeDeclareArguments.setExchangeName(exchangeName);
        exchangeDeclareArguments.setExchangeType(exchangeType);
        exchangeDeclareArguments.setDurable(durable);
        exchangeDeclareArguments.setAutoDelete(autoDelete);
        exchangeDeclareArguments.setArguments(arguments);
        byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments);
        // 构造并写入请求
        Request request = new Request();
        request.setType(0x3);
        request.setLength(payload.length);
        request.setPayload(payload);
        connection.writeRequest(request);
        // 阻塞等待响应
        BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid());
        return basicReturns.isOk();
    }

    /**
     * 删除交换机, 给服务器发送一个 type = 0x4 的请求
     * @param exchangeName 交换机名称
     * @return 删除成功 true
     */
    public boolean exchangeDelete(String exchangeName) throws IOException {
        // 构造 payload
        ExchangeDeleteArguments exchangeDeleteArguments = new ExchangeDeleteArguments();
        exchangeDeleteArguments.setRid(generateRid());
        exchangeDeleteArguments.setExchangeName(exchangeName);
        exchangeDeleteArguments.setChannelId(channelId);
        byte[] payload = BinaryTool.toBytes(exchangeDeleteArguments);
        // 构造请求并写入请求
        Request request = new Request();
        request.setType(0x4);
        request.setLength(payload.length);
        request.setPayload(payload);
        connection.writeRequest(request);
        // 等待响应
        BasicReturns basicReturns = waitResult(exchangeDeleteArguments.getRid());
        return basicReturns.isOk();
    }

    /**
     * 创建队列, 给服务器发送一个 type = 0x5 的请求
     * @param queueName 队列名
     * @param durable 是否持久化
     * @param exclusive 是否独占
     * @param autoDelete 是否自动删除
     * @param arguments 自定义配置
     * @return 创建成功 true
     */
    public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,
                                Map<String, Object> arguments) throws IOException {
        QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();
        queueDeclareArguments.setRid(generateRid());
        queueDeclareArguments.setChannelId(channelId);
        queueDeclareArguments.setQueueName(queueName);
        queueDeclareArguments.setDurable(durable);
        queueDeclareArguments.setExclusive(exclusive);
        queueDeclareArguments.setAutoDelete(autoDelete);
        queueDeclareArguments.setArguments(arguments);
        byte[] payload = BinaryTool.toBytes(queueDeclareArguments);
        // 构造并写入请求
        Request request = new Request();
        request.setType(0x5);
        request.setLength(payload.length);
        request.setPayload(payload);
        connection.writeRequest(request);
        // 等待响应
        BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid());
        return basicReturns.isOk();
    }

    /**
     * 删除队列, 给服务器发送 type = 0x6 的请求
     * @param queueName 队列名
     * @return 删除成功 true
     */
    public boolean queueDelete(String queueName) throws IOException {
        QueueDeleteArguments queueDeleteArguments = new QueueDeleteArguments();
        queueDeleteArguments.setRid(generateRid());
        queueDeleteArguments.setChannelId(channelId);
        queueDeleteArguments.setQueueName(queueName);
        byte[] payload = BinaryTool.toBytes(queueDeleteArguments);
        Request request = new Request();
        request.setType(0x6);
        request.setLength(payload.length);
        request.setPayload(payload);
        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(queueDeleteArguments.getRid());
        return basicReturns.isOk();
    }

    /**
     * 创建绑定, 给服务器发送 type = 0x7 的请求
     * @param queueName 队列名
     * @param exchangeName 交换机名
     * @param bindingKey bindingKey
     * @return 创建绑定成功 true
     */
    public boolean queueBind(String queueName, String exchangeName, String bindingKey) throws IOException {
        QueueBindArguments queueBindArguments = new QueueBindArguments();
        queueBindArguments.setRid(generateRid());
        queueBindArguments.setChannelId(channelId);
        queueBindArguments.setQueueName(queueName);
        queueBindArguments.setExchangeName(exchangeName);
        queueBindArguments.setBindingKey(bindingKey);
        byte[] payload = BinaryTool.toBytes(queueBindArguments);
        Request request = new Request();
        request.setType(0x7);
        request.setLength(payload.length);
        request.setPayload(payload);
        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(queueBindArguments.getRid());
        return basicReturns.isOk();
    }

    /**
     * 解除绑定, 给服务器发送 type = 0x8 的请求
     * @param queueName 队列名
     * @param exchangeName 交换机名
     * @return 解除成功 true
     */
    public boolean queueUnbind(String queueName, String exchangeName) throws IOException {
        QueueUnbindArguments queueUnbindArguments = new QueueUnbindArguments();
        queueUnbindArguments.setRid(generateRid());
        queueUnbindArguments.setChannelId(channelId);
        queueUnbindArguments.setQueueName(queueName);
        queueUnbindArguments.setExchangeName(exchangeName);
        byte[] payload = BinaryTool.toBytes(queueUnbindArguments);
        Request request = new Request();
        request.setType(0x8);
        request.setLength(payload.length);
        request.setPayload(payload);
        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(queueUnbindArguments.getRid());
        return basicReturns.isOk();
    }

    /**
     * 发送消息, 给服务器发送 type = 0x9 的请求
     * @param exchangeName 交换机名
     * @param routingKey 对于 TopicExchange: routingKey 为消息上带有的内容, 与 bindingKey 匹配
     *                   对于 DirectExchange: routingKey 表示要转发的队列名
     *                   对于 FanoutExchange: routingKey 无意义
     * @param basicProperties 消息的属性
     * @param body 消息的正文
     * @return 发送消息成功 true
     */
    public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {
        BasicPublishArguments basicPublishArguments = new BasicPublishArguments();
        basicPublishArguments.setRid(generateRid());
        basicPublishArguments.setChannelId(channelId);
        basicPublishArguments.setExchangeName(exchangeName);
        basicPublishArguments.setRoutingKey(routingKey);
        basicPublishArguments.setBasicProperties(basicProperties);
        basicPublishArguments.setBody(body);
        byte[] payload = BinaryTool.toBytes(basicPublishArguments);
        Request request = new Request();
        request.setType(0x9);
        request.setLength(payload.length);
        request.setPayload(payload);
        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(basicPublishArguments.getRid());
        return basicReturns.isOk();
    }

    /**
     * 订阅消息, 向服务器发送 type = 0xa 的请求
     * @param queueName 订阅的队列名
     * @param autoAck 自动应答
     * @param consumer 回调
     * @return 订阅成功 true
     */
    public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException {
        // 先设置回调
        if (this.consumer != null) {
            throw new MqException("该 channel 已经设置过消费消息的回调, 不可重复设置!");
        }
        this.consumer = consumer;
        // 构造请求并写入
        BasicConsumeArguments arguments = new BasicConsumeArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
        // consumerTag 同样使用 channelId 标识
        arguments.setConsumerTag(channelId);
        arguments.setQueueName(queueName);
        arguments.setAutoAck(autoAck);
        byte[] payload = BinaryTool.toBytes(arguments);
        Request request = new Request();
        request.setType(0xa);
        request.setLength(payload.length);
        request.setPayload(payload);
        connection.writeRequest(request);
        // 等待响应
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

    /**
     * 手动确认应答, 向服务器发送 type = 0xb 的请求
     * @param queueName 队列名
     * @param messageId 消息 id
     * @return 应答成功 true
     */
    public boolean basicAck(String queueName, String messageId) throws IOException {
        BasicAckArguments arguments = new BasicAckArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
        arguments.setQueueName(queueName);
        arguments.setMessageId(messageId);
        byte[] payload = BinaryTool.toBytes(arguments);
        Request request = new Request();
        request.setType(0xb);
        request.setLength(payload.length);
        request.setPayload(payload);
        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

    public String getChannelId() {
        return channelId;
    }

    public void setChannelId(String channelId) {
        this.channelId = channelId;
    }

    public Connection getConnection() {
        return connection;
    }

    public void setConnection(Connection connection) {
        this.connection = connection;
    }

    public ConcurrentHashMap<String, BasicReturns> getBasicReturnsMap() {
        return basicReturnsMap;
    }

    public void setBasicReturnsMap(ConcurrentHashMap<String, BasicReturns> basicReturnsMap) {
        this.basicReturnsMap = basicReturnsMap;
    }

    public Consumer getConsumer() {
        return consumer;
    }

    public void setConsumer(Consumer consumer) {
        this.consumer = consumer;
    }
}
